{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kubeflow pipeline\n",
    "This is a fairly simple pipeline, containing sequential steps:\n",
    "\n",
    "1. Update data - implemented by lightbend/recommender-data-update-publisher:0.2 image\n",
    "2. Run model training. Ideally we would run TFJob, but due to the current limitations for pipelines, we will directly use an image implementing training lightbend/ml-tf-recommender:0.1\n",
    "3. Update serving model - implemented by lightbend/recommender-model-publisher:0.2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already up-to-date: kubernetes in ./.local/lib/python3.6/site-packages (10.0.1)\n",
      "Requirement already satisfied, skipping upgrade: pyyaml>=3.12 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (5.3)\n",
      "Requirement already satisfied, skipping upgrade: six>=1.9.0 in /usr/lib/python3/dist-packages (from kubernetes) (1.11.0)\n",
      "Requirement already satisfied, skipping upgrade: urllib3>=1.24.2 in ./.local/lib/python3.6/site-packages (from kubernetes) (1.24.3)\n",
      "Requirement already satisfied, skipping upgrade: requests-oauthlib in /usr/local/lib/python3.6/dist-packages (from kubernetes) (1.3.0)\n",
      "Requirement already satisfied, skipping upgrade: certifi>=14.05.14 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (2019.11.28)\n",
      "Requirement already satisfied, skipping upgrade: python-dateutil>=2.5.3 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (2.8.1)\n",
      "Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (45.1.0)\n",
      "Requirement already satisfied, skipping upgrade: requests in /usr/local/lib/python3.6/dist-packages (from kubernetes) (2.22.0)\n",
      "Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (0.57.0)\n",
      "Requirement already satisfied, skipping upgrade: google-auth>=1.0.1 in /usr/local/lib/python3.6/dist-packages (from kubernetes) (1.11.0)\n",
      "Requirement already satisfied, skipping upgrade: oauthlib>=3.0.0 in /usr/local/lib/python3.6/dist-packages (from requests-oauthlib->kubernetes) (3.1.0)\n",
      "Requirement already satisfied, skipping upgrade: idna<2.9,>=2.5 in /usr/lib/python3/dist-packages (from requests->kubernetes) (2.6)\n",
      "Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /usr/local/lib/python3.6/dist-packages (from requests->kubernetes) (3.0.4)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.0.1->kubernetes) (0.2.8)\n",
      "Requirement already satisfied, skipping upgrade: cachetools<5.0,>=2.0.0 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.0.1->kubernetes) (4.0.0)\n",
      "Requirement already satisfied, skipping upgrade: rsa<4.1,>=3.1.4 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.0.1->kubernetes) (4.0)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1<0.5.0,>=0.4.6 in /usr/local/lib/python3.6/dist-packages (from pyasn1-modules>=0.2.1->google-auth>=1.0.1->kubernetes) (0.4.8)\n",
      "Requirement already up-to-date: kfp in ./.local/lib/python3.6/site-packages (0.2.2.1)\n",
      "Requirement already satisfied, skipping upgrade: PyJWT>=1.6.4 in ./.local/lib/python3.6/site-packages (from kfp) (1.7.1)\n",
      "Requirement already satisfied, skipping upgrade: requests-toolbelt>=0.8.0 in ./.local/lib/python3.6/site-packages (from kfp) (0.9.1)\n",
      "Requirement already satisfied, skipping upgrade: python-dateutil in /usr/local/lib/python3.6/dist-packages (from kfp) (2.8.1)\n",
      "Requirement already satisfied, skipping upgrade: PyYAML in /usr/local/lib/python3.6/dist-packages (from kfp) (5.3)\n",
      "Requirement already satisfied, skipping upgrade: kfp-server-api<=0.1.40,>=0.1.18 in ./.local/lib/python3.6/site-packages (from kfp) (0.1.40)\n",
      "Requirement already satisfied, skipping upgrade: google-cloud-storage>=1.13.0 in /usr/local/lib/python3.6/dist-packages (from kfp) (1.25.0)\n",
      "Requirement already satisfied, skipping upgrade: jsonschema>=3.0.1 in /usr/local/lib/python3.6/dist-packages (from kfp) (3.2.0)\n",
      "Requirement already satisfied, skipping upgrade: Deprecated in ./.local/lib/python3.6/site-packages (from kfp) (1.2.7)\n",
      "Requirement already satisfied, skipping upgrade: google-auth>=1.6.1 in /usr/local/lib/python3.6/dist-packages (from kfp) (1.11.0)\n",
      "Collecting kubernetes<=10.0.0,>=8.0.0\n",
      "  Using cached kubernetes-10.0.0-py2.py3-none-any.whl (1.5 MB)\n",
      "Requirement already satisfied, skipping upgrade: argo-models==2.2.1a in ./.local/lib/python3.6/site-packages (from kfp) (2.2.1a0)\n",
      "Requirement already satisfied, skipping upgrade: urllib3<1.25,>=1.15 in ./.local/lib/python3.6/site-packages (from kfp) (1.24.3)\n",
      "Requirement already satisfied, skipping upgrade: certifi in /usr/local/lib/python3.6/dist-packages (from kfp) (2019.11.28)\n",
      "Requirement already satisfied, skipping upgrade: tabulate==0.8.3 in ./.local/lib/python3.6/site-packages (from kfp) (0.8.3)\n",
      "Requirement already satisfied, skipping upgrade: click==7.0 in ./.local/lib/python3.6/site-packages (from kfp) (7.0)\n",
      "Requirement already satisfied, skipping upgrade: cloudpickle==1.1.1 in ./.local/lib/python3.6/site-packages (from kfp) (1.1.1)\n",
      "Requirement already satisfied, skipping upgrade: six>=1.10 in /usr/lib/python3/dist-packages (from kfp) (1.11.0)\n",
      "Requirement already satisfied, skipping upgrade: cryptography>=2.4.2 in ./.local/lib/python3.6/site-packages (from kfp) (2.8)\n",
      "Requirement already satisfied, skipping upgrade: requests<3.0.0,>=2.0.1 in /usr/local/lib/python3.6/dist-packages (from requests-toolbelt>=0.8.0->kfp) (2.22.0)\n",
      "Requirement already satisfied, skipping upgrade: google-resumable-media<0.6dev,>=0.5.0 in /usr/local/lib/python3.6/dist-packages (from google-cloud-storage>=1.13.0->kfp) (0.5.0)\n",
      "Requirement already satisfied, skipping upgrade: google-cloud-core<2.0dev,>=1.2.0 in /usr/local/lib/python3.6/dist-packages (from google-cloud-storage>=1.13.0->kfp) (1.3.0)\n",
      "Requirement already satisfied, skipping upgrade: setuptools in /usr/local/lib/python3.6/dist-packages (from jsonschema>=3.0.1->kfp) (45.1.0)\n",
      "Requirement already satisfied, skipping upgrade: attrs>=17.4.0 in /usr/local/lib/python3.6/dist-packages (from jsonschema>=3.0.1->kfp) (19.3.0)\n",
      "Requirement already satisfied, skipping upgrade: importlib-metadata; python_version < \"3.8\" in /usr/local/lib/python3.6/dist-packages (from jsonschema>=3.0.1->kfp) (1.4.0)\n",
      "Requirement already satisfied, skipping upgrade: pyrsistent>=0.14.0 in /usr/local/lib/python3.6/dist-packages (from jsonschema>=3.0.1->kfp) (0.15.7)\n",
      "Requirement already satisfied, skipping upgrade: wrapt<2,>=1.10 in /usr/local/lib/python3.6/dist-packages (from Deprecated->kfp) (1.11.2)\n",
      "Requirement already satisfied, skipping upgrade: rsa<4.1,>=3.1.4 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.6.1->kfp) (4.0)\n",
      "Requirement already satisfied, skipping upgrade: cachetools<5.0,>=2.0.0 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.6.1->kfp) (4.0.0)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /usr/local/lib/python3.6/dist-packages (from google-auth>=1.6.1->kfp) (0.2.8)\n",
      "Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /usr/local/lib/python3.6/dist-packages (from kubernetes<=10.0.0,>=8.0.0->kfp) (0.57.0)\n",
      "Requirement already satisfied, skipping upgrade: requests-oauthlib in /usr/local/lib/python3.6/dist-packages (from kubernetes<=10.0.0,>=8.0.0->kfp) (1.3.0)\n",
      "Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.8 in ./.local/lib/python3.6/site-packages (from cryptography>=2.4.2->kfp) (1.14.0)\n",
      "Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /usr/local/lib/python3.6/dist-packages (from requests<3.0.0,>=2.0.1->requests-toolbelt>=0.8.0->kfp) (3.0.4)\n",
      "Requirement already satisfied, skipping upgrade: idna<2.9,>=2.5 in /usr/lib/python3/dist-packages (from requests<3.0.0,>=2.0.1->requests-toolbelt>=0.8.0->kfp) (2.6)\n",
      "Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=1.16.0 in /usr/local/lib/python3.6/dist-packages (from google-cloud-core<2.0dev,>=1.2.0->google-cloud-storage>=1.13.0->kfp) (1.16.0)\n",
      "Requirement already satisfied, skipping upgrade: zipp>=0.5 in /usr/local/lib/python3.6/dist-packages (from importlib-metadata; python_version < \"3.8\"->jsonschema>=3.0.1->kfp) (2.1.0)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1>=0.1.3 in /usr/local/lib/python3.6/dist-packages (from rsa<4.1,>=3.1.4->google-auth>=1.6.1->kfp) (0.4.8)\n",
      "Requirement already satisfied, skipping upgrade: oauthlib>=3.0.0 in /usr/local/lib/python3.6/dist-packages (from requests-oauthlib->kubernetes<=10.0.0,>=8.0.0->kfp) (3.1.0)\n",
      "Requirement already satisfied, skipping upgrade: pycparser in ./.local/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.8->cryptography>=2.4.2->kfp) (2.19)\n",
      "Requirement already satisfied, skipping upgrade: googleapis-common-protos<2.0dev,>=1.6.0 in /usr/local/lib/python3.6/dist-packages (from google-api-core<2.0.0dev,>=1.16.0->google-cloud-core<2.0dev,>=1.2.0->google-cloud-storage>=1.13.0->kfp) (1.51.0)\n",
      "Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /usr/local/lib/python3.6/dist-packages (from google-api-core<2.0.0dev,>=1.16.0->google-cloud-core<2.0dev,>=1.2.0->google-cloud-storage>=1.13.0->kfp) (3.11.2)\n",
      "Requirement already satisfied, skipping upgrade: pytz in /usr/local/lib/python3.6/dist-packages (from google-api-core<2.0.0dev,>=1.16.0->google-cloud-core<2.0dev,>=1.2.0->google-cloud-storage>=1.13.0->kfp) (2019.3)\n",
      "Installing collected packages: kubernetes\n",
      "  Attempting uninstall: kubernetes\n",
      "    Found existing installation: kubernetes 10.0.1\n",
      "    Uninstalling kubernetes-10.0.1:\n",
      "      Successfully uninstalled kubernetes-10.0.1\n",
      "Successfully installed kubernetes-10.0.0\n"
     ]
    }
   ],
   "source": [
    "!pip install kubernetes --upgrade --user\n",
    "!pip install kfp --upgrade --user\n",
    "\n",
    "\n",
    "import kfp  # the Pipelines SDK.  This library is included with the notebook image.\n",
    "from kfp import compiler\n",
    "import kfp.dsl as dsl\n",
    "import kfp.notebook\n",
    "from kubernetes import client as k8s_client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create/Get an Experiment in the Kubeflow Pipeline System\n",
    "The Kubeflow Pipeline system requires an \"Experiment\" to group pipeline runs. You can create a new experiment, or call client.list_experiments() to get existing ones."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = kfp.Client()\n",
    "client.list_experiments()\n",
    "#exp = client.create_experiment(name='mdupdate')\n",
    "exp = client.get_experiment(experiment_name ='mdupdate')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Define a Pipeline\n",
    "Authoring a pipeline is like authoring a normal Python function. The pipeline function describes the topology of the pipeline.\n",
    "\n",
    "Each step in the pipeline is typically a ContainerOp --- a simple class or function describing how to interact with a docker container image. In the pipeline, all the container images referenced in the pipeline are already built."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(\n",
    "  name='Recommender model update',\n",
    "  description='Demonstrate usage of pipelines for multi-step model update'\n",
    ")\n",
    "def recommender_pipeline():\n",
    "    # Load new data\n",
    "  data = dsl.ContainerOp(\n",
    "      name='updatedata',\n",
    "      image='lightbend/recommender-data-update-publisher:0.2') \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',value='http://minio-service.kubeflow.svc.cluster.local:9000')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))\n",
    "    # Train the model\n",
    "  train = dsl.ContainerOp(\n",
    "      name='trainmodel',\n",
    "      image='lightbend/ml-tf-recommender:0.1') \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',value='minio-service.kubeflow.svc.cluster.local:9000')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123'))\n",
    "  train.after(data)\n",
    "    # Publish new model model\n",
    "  publish = dsl.ContainerOp(\n",
    "      name='publishmodel',\n",
    "      image='lightbend/recommender-model-publisher:0.2') \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_URL',value='http://minio-service.kubeflow.svc.cluster.local:9000')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_KEY', value='minio')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='MINIO_SECRET', value='minio123')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='KAFKA_BROKERS', value='cloudflow-kafka-brokers.cloudflow.svc.cluster.local:9092')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='DEFAULT_RECOMMENDER_URL', value='http://recommendermodelserver.kubeflow.svc.cluster.local:8501')) \\\n",
    "    .add_env_variable(k8s_client.V1EnvVar(name='ALTERNATIVE_RECOMMENDER_URL', value='http://recommendermodelserver1.kubeflow.svc.cluster.local:8501'))\n",
    "  publish.after(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Compile pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "compiler.Compiler().compile(recommender_pipeline, 'pipeline.tar.gz')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Submit an experiment run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "Run link <a href=\"/pipeline/#/runs/details/df24284c-c7a1-480e-91b6-398bd352f164\" target=\"_blank\" >here</a>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "run = client.run_pipeline(exp.id, 'pipeline1', 'pipeline.tar.gz')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
